package com.lecosa.flink;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;


public class FlinkMysqlCopyTable {
    public static void main(String[] args) throws Exception {
    
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tableEnvironment = TableEnvironment.create(settings);

        String schema = "id BIGINT ,name STRING, password STRING,age INT";
        String source_table = "student1";
        String sink_table = "studentcp";
        String flink_source_table = "mysource";
        String flink_sink_table = "mysink";

        String base_sql = "CREATE TABLE %s (%s) " +
                "WITH (" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://192.168.249.112:3306/ruantong'," +
                "'connector.driver' = 'com.mysql.jdbc.Driver'," +
                "'connector.table' = '%s'," +
                " 'connector.username' = 'root'," +
                " 'connector.password' = 'root'" +
                " )";
        String source_ddl = String.format(base_sql, flink_source_table, schema, source_table);
        String sink_ddl = String.format(base_sql, flink_sink_table, schema, sink_table);

        
        System.out.println(  source_ddl);
        System.out.println(  sink_ddl);
        tableEnvironment.executeSql(source_ddl);
        tableEnvironment.executeSql(sink_ddl);

        String insertsql = String.format("insert into %s select * from %s", flink_sink_table, flink_source_table);
        tableEnvironment.executeSql(insertsql).print();
    }

    }
